/*
 * FilePath     : \src\hooks\useMQ.ts
 * Author       : 苏军志
 * Date         : 2024-01-16 08:32
 * LastEditors  : 苏军志
 * LastEditTime : 2024-01-16 09:25
 * Description  : MQ客户端封装
 * CodeIterationRecord:
 */

/* eslint-disable */
import { Client } from "@stomp/stompjs";
export function useMQ() {
  const mqSetting = common.session("mqSetting");
  /**
   * @description: Q客户端 消息队列
   * @see https://www.rabbitmq.com/stomp.html
   * @see https://github.com/stomp-js/stompjs
   * @see https://stomp-js.github.io/api-docs/latest/classes/Client.html
   */
  class MqClient {
    private subscriptions: Record<string, any>;
    private onConnectCallbacks: Function[];
    private isConnected: boolean;
    private client: Client;
    /**
     * @description: 构造函数，连接MQBroker，
     * @return
     */
    constructor() {
      // 已订阅的队列，key为队列名称，value为订阅后返回的接口实现，接口提供了一个方法用于取消订阅
      this.subscriptions = {};
      this.onConnectCallbacks = [];
      this.isConnected = false;
      this.client = new Client({
        brokerURL: mqSetting.url,
        connectHeaders: {
          login: mqSetting.user,
          passcode: mqSetting.password
        },
        onConnect: () => {
          this.isConnected = true;
          console.log("%c已连接MQ服务器", "color:#32cd32");
          // 连接建立后，执行所有回调
          this.onConnectCallbacks.forEach((callback) => {
            callback();
          });
          this.onConnectCallbacks = [];
        },
        onDisconnect: () => {
          this.isConnected = false;
          console.log("%c已断开MQ服务器连接", "color:#32cd32");
        },
        /**
         * @description: 这个回调函数用于处理STOMP协议级别的错误。
         * 例如，当STOMP服务器发送了一个错误帧（ERROR frame）响应非法的STOMP帧或者命令时，此回调将被触发。
         * STOMP错误通常与STOMP命令不正确、订阅地址不存在或服务器不能处理请求等情况有关。
         * @return
         */
        onStompError: (frame) => {
          console.error("MQ服务器报告了一个错误：" + frame.headers["message"]);
          console.error("额外信息：" + frame.body);
        },
        onWebSocketError: (error) => {
          console.error("MQ连接出现错误！", error);
          // TODO：重连，最好设置一个重连次数，超过次数后不再重连
        },
        heartbeatIncoming: 2000,
        heartbeatOutgoing: 2000
      });
      // 初始化与MQ服务器的连接
      this.client.activate();
      // 监听beforeunload事件，当页面刷新或关闭时断开WebSocket连接
      window.addEventListener("beforeunload", () => {
        this.client.deactivate();
      });
    }
    /**
     * @description: 接收广播消息
     * @param queue 队列名称
     * @param routingKey 可选，路由键
     * @param callback 回调函数，用于处理接收到的消息
     * @return
     */
    subscribe(queue: string, routingKey: string, callback: Function) {
      const subscribeFunc = () => {
        const subscription = this.client.subscribe(`/exchange/${queue}/${routingKey}`, (message) => {
          const msg = JSON.parse(message.body) || message.body;
          callback(msg);
        });
        this.subscriptions[`/exchange/${queue}/${routingKey}`] = subscription;
      };
      this.consume(subscribeFunc);
    }
    /**
     * @description: 接收消息
     * @param queue 队列名称
     * @param callback 回调函数，用于处理接收到的消息
     * @return
     */
    receive(queue: string, callback: Function) {
      const subscribeFunc = () => {
        const subscription = this.client.subscribe(`/queue/${queue}`, (message) => {
          const msg = JSON.parse(message.body) || message.body;
          callback(msg);
        });
        this.subscriptions[`/queue/${queue}`] = subscription;
      };
      this.consume(subscribeFunc);
    }
    /**
     * @description: 若Broker已连接，则立即执行func，否则将func放入队列，待连接建立后执行
     * @param func 订阅函数
     * @return
     */
    consume(func: Function) {
      if (this.isConnected) {
        func();
      } else {
        this.onConnectCallbacks.push(func);
      }
    }
    /**
     * @description: 取消订阅
     * @param queue 队列名称
     * @param routingKey 可选，路由键
     * @return
     */
    unsubscribe(queue: string, routingKey?: string) {
      let destination = undefined;
      if (!routingKey) {
        destination = `/queue/${queue}`;
      } else {
        destination = `/exchange/${queue}${routingKey ? "/" + routingKey : ""}`;
      }

      if (this.subscriptions[destination]) {
        this.subscriptions[destination].unsubscribe();
        console.error("此目的地未进行订阅:", destination);
      }
      delete this.subscriptions[destination];
    }
    /**
     * @description: 断开WebSocket连接、取消所有订阅
     * @return
     */
    disconnect() {
      if (!this.isConnected) {
        return;
      }
      Object.keys(this.subscriptions).forEach((queue) => {
        this.unsubscribe(queue);
      });
      // 目前取消订阅疑似存在问题，打印目的地未进行订阅时，无法删除Key，此为补救措施
      this.subscriptions = {};
      this.client.deactivate();
    }
  }

  // MQ客户端实例
  let instance: any = undefined;
  return {
    /**
     * mq配置参数
     */
    mqSetting,
    /**
     * @description: 实现单例
     * @return
     */
    getMqClient() {
      if (!instance) {
        instance = new MqClient();
      }
      return instance;
    }
  };
}
